-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36067][runtime] Support optimize stream graph based on input info. #25790
Conversation
9d3e01f
to
9b2cc2a
Compare
...e/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
Show resolved
Hide resolved
9b2cc2a
to
ecc3183
Compare
@@ -207,6 +210,10 @@ public boolean isForward() { | |||
return intermediateDataSet.isForward(); | |||
} | |||
|
|||
public boolean isEveryConsumerConsumeAllSubPartitions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused by why this method is checking broadcasts - how does this relate to downstream consumers being able to consume the subpartitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was indeed confusing before to have both broadcast
and EveryConsumerConsumeAllSubPartitions
present at the same time.
Let me explain the reason for introducing these changes: Broadcast was used to represent two specific modes for reading and writing. Writing required that a single subpartition contains all data, and reading corresponded to consuming that single subpartition.
In my opinion, broadcasting requires that the downstream consumes all produced data. The optimization is that the upstream could write all data to a single subpartition, and the downstream could consume it. These are different concepts.
So, in this PR, I introduced singleSubpartitionContainsAllData
to replace the original optimization concept. The broadcast
concept will change to mean the downstream consumes all produced data.
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultInfo.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR! @JunRuiLee
I have a few comments. PTAL.
...untime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/OperatorsFinished.java
Outdated
Show resolved
Hide resolved
...untime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/OperatorsFinished.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizationStrategy.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizationStrategy.java
Outdated
Show resolved
Hide resolved
...ime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizer.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
Show resolved
Hide resolved
...c/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultInfo.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AbstractBlockingResultInfo.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
Show resolved
Hide resolved
ff6840e
to
95f7597
Compare
...e/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
Show resolved
Hide resolved
...c/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java
Show resolved
Hide resolved
bd01f20
to
79262b4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing all the comments. @JunRuiLee
LGTM.
…OptimizationStrategy
…artition info when all consumers created and initialized.
79262b4
to
1139e8b
Compare
1139e8b
to
2081ab0
Compare
What is the purpose of the change
[FLINK-36067][runtime] Support optimize stream graph based on input info.
Brief change log
Verifying this change
This change added tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation